1 module firecracker_d.core.transport; 2 import std.stdio; 3 import std.socket; 4 import requests; 5 import std.format; 6 import core.time; 7 import core.stdc.errno; 8 import core.stdc.string; 9 import std.conv; 10 11 12 class UnixStream : NetworkStream { 13 private { 14 Duration timeout; 15 Socket s; 16 bool __isOpen; 17 bool __isConnected; 18 string _facSocket; 19 string _bind; 20 } 21 void open(AddressFamily fa) { 22 __isOpen = true; 23 s = new Socket(fa, SocketType.STREAM); 24 } 25 @property Socket so() @safe pure { 26 return s; 27 } 28 @property bool isOpen() @safe @nogc pure const { 29 return s && __isOpen; 30 } 31 @property bool isConnected() @safe @nogc pure const { 32 return s && __isOpen && __isConnected; 33 } 34 override void close() @trusted { 35 debug(requests) tracef("Close socket"); 36 if ( isOpen ) { 37 s.close(); 38 __isOpen = false; 39 __isConnected = false; 40 } 41 s = null; 42 } 43 /*** 44 * bind() just remember address. We will cal bind() at the time of connect as 45 * we can have several connection trials. 46 ***/ 47 override void bind(string to) { 48 _bind = to; 49 } 50 /*** 51 * Make connection to remote site. Bind, handle connection error, try several addresses, etc 52 ***/ 53 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 54 Address[] addresses; 55 __isConnected = false; 56 try { 57 addresses ~= new UnixAddress(host); 58 59 } catch (Exception e) { 60 throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg)); 61 } 62 foreach(a; addresses) { 63 try { 64 open(AddressFamily.UNIX); 65 if ( _bind !is null ) { 66 auto ad = new UnixAddress(_bind); 67 writeln("bind to ", _bind); 68 s.bind(ad); 69 } 70 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 71 s.connect(a); 72 __isConnected = true; 73 break; 74 } catch (SocketException e) { 75 s.close(); 76 throw new Exception("Could not connect to server."); 77 } 78 } 79 if ( !__isConnected ) { 80 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 81 } 82 return this; 83 } 84 85 override ptrdiff_t send(const(void)[] buff) 86 in {assert(__isConnected);} 87 body { 88 auto rc = s.send(buff); 89 if (rc < 0) { 90 close(); 91 throw new NetworkException("sending data: %s".format(to!string(strerror(errno)))); 92 } 93 return rc; 94 } 95 96 ptrdiff_t receive(void[] buff) { 97 while (true) { 98 auto r = s.receive(buff); 99 if (r < 0) { 100 auto e = errno; 101 version(Windows) { 102 close(); 103 if ( e == 0 ) { 104 throw new TimeoutException("Timeout receiving data"); 105 } 106 throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno)))); 107 } 108 version(Posix) { 109 if ( e == EINTR ) { 110 continue; 111 } 112 close(); 113 if ( e == EAGAIN ) { 114 throw new TimeoutException("Timeout receiving data"); 115 } 116 throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno)))); 117 } 118 } 119 else { 120 buff.length = r; 121 } 122 return r; 123 } 124 assert(false); 125 } 126 127 @property void readTimeout(Duration timeout) @safe { 128 if ( __isConnected ) 129 { 130 s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout); 131 } 132 } 133 override NetworkStream accept() { 134 assert(false, "Implement before use"); 135 } 136 @property override void reuseAddr(bool yes){ 137 if (yes) { 138 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); 139 } 140 else { 141 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0); 142 } 143 } 144 override void bind(Address addr){ 145 s.bind(addr); 146 } 147 override void listen(int n) { 148 s.listen(n); 149 }; 150 151 void setFactorySocket(string socket) { 152 _facSocket = socket.idup; 153 } 154 155 NetworkStream dg(string scheme, string host, ushort port) { 156 UnixStream f = new UnixStream(); 157 f.connect(_facSocket, 0); 158 return cast(NetworkStream)f; 159 } 160 161 }